#include "IOToAliyun.h"

#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
#ifdef WIN32
#define usleep(x) Sleep(x)
#endif // WIN32

#include "business_def.h"
#include "ProducerMqttAliyun.h"
#include "ConsumerMqttAliyun.h"
#include "File.h"
#include "pfunc_time.h"
#include "Log.h"

#include "AliyunIotFunc.h"
#include "svc_control.h"

IOToMqttAliyun::IOToMqttAliyun(void)
	: running(true)
	, initLink(false)
	, pclient(NULL)
	, h_ota(NULL)
	, msg_buf(NULL)
	, msg_readbuf(NULL)
	, cnt(1)
	, producer(NULL)
	, consumer(NULL)
	, send_queue(QueueDataSingle<WCacheAliyun>::getInstance())
	// , rec_queue(QueueDataSingle<RCacheAliyun>::getInstance())
{
	init();
}

IOToMqttAliyun::~IOToMqttAliyun(void)
{
	running = false;
	uninit();
}

void IOToMqttAliyun::init()
{
	//get config and create it to map this interface
	init_config();
	//sub topic init
	init_sub_top();
	//
	producer = new ProducerMqttAliyun(svcTriples);
	producer->start(true);
	//
	consumer = new ConsumerMqttAliyun(subTopicMaps);
	consumer->start(true);
	//init iot interface
	init_iot();
};

void IOToMqttAliyun::init_config()
{
	BusinessDef *ptr_BD = BusinessDef::getInstance();
	gatewayTriples = ptr_BD->getGateway();
	//
	pyfree::OTAAliyun ota_aliyun = ptr_BD->getOTAAliyun();
	otaInfo.form(ota_aliyun);
#ifdef WIN32
	otaInfo.pathdiv = "\\";
#else
	otaInfo.pathdiv = "/";
#endif // WIN32
	//
	char dir_[256] = { 0 };
	if(!pyfree::getCurrentDir(dir_,256))
	{
		CLogger::createInstance()->Log(MsgInfo, "%s|%03d :: get current dir is error!"
			, __func__, __LINE__);
	}
	otaInfo.appDir = std::string(dir_, strlen(dir_));

	if (otaInfo.update_dir.empty()) {
		otaInfo.update_dir = otaInfo.appDir + otaInfo.pathdiv + "update";
	}
	//create dir
	pyfree::createDir(otaInfo.update_dir);
	otaInfo.version_file = otaInfo.update_dir + otaInfo.pathdiv + otaInfo.version_file;
	otaInfo.update_file = otaInfo.update_dir + otaInfo.pathdiv + otaInfo.update_file;
	otaInfo.batfile = otaInfo.update_dir + otaInfo.pathdiv + otaInfo.batfile;
}

void IOToMqttAliyun::init_sub_top()
{
	BusinessDef *ptr_BD = BusinessDef::getInstance();
	ptr_BD->getSvcInfo(svcTriples);
	{
		AliyunServiceDesc topicMapDesc;
		topicMapDesc.topicType = TOPIC_SERVICE_PRO_SET;
		for (std::map<int, SvcDesc>::iterator it = svcTriples.begin(); it != svcTriples.end(); ++it)
		{
			pyfree::ServiceInfo svcinfo;
			svcinfo.svc_name = it->second.svc_name;
			svcinfo.app_dir = it->second.app_dir;
			svcinfo.app_name = it->second.app_name;
			topicMapDesc.triples[it->second.aliyun_key] = svcinfo;
		}
		char buf[128] = { 0 };
		sprintf(buf, ALINK_TOPIC_SERVICE_PRO_SET
			, gatewayTriples.product_key.c_str()
			, gatewayTriples.device_name.c_str());
		std::string alink_topic_service_pro_set = std::string(buf, strlen(buf));
		subTopicMaps[alink_topic_service_pro_set] = topicMapDesc;
		sprintf(buf, ALINK_TOPIC_EVENT_PRO_POST_REPLY
			, gatewayTriples.product_key.c_str()
			, gatewayTriples.device_name.c_str());
		std::string alink_topic_service_pro_post = std::string(buf, strlen(buf));
		subTopicMaps[alink_topic_service_pro_post] = topicMapDesc;
	}
}

void IOToMqttAliyun::init_iot()
{
	if (NULL == (msg_buf = (char *)HAL_Malloc(MQTT_MSGLEN))) {
		CLogger::createInstance()->Log(MsgInfo, "%s|%03d :: not enough memory"
			, __func__, __LINE__);
	}

	if (NULL == (msg_readbuf = (char *)HAL_Malloc(MQTT_MSGLEN))) {
		CLogger::createInstance()->Log(MsgInfo, "%s|%03d :: not enough memory"
			, __func__, __LINE__);
	}
	IOT_OpenLog("mqtt");
	IOT_SetLogLevel(IOT_LOG_DEBUG);

	HAL_SetProductKey((char*)gatewayTriples.product_key.c_str());
	HAL_SetProductSecret((char*)gatewayTriples.product_secret.c_str());
	HAL_SetDeviceName((char*)gatewayTriples.device_name.c_str());
	HAL_SetDeviceSecret((char*)gatewayTriples.device_secret.c_str());
}

void IOToMqttAliyun::init_ota()
{
	if(NULL==pclient)
	{
		CLogger::createInstance()->Log(MsgInfo, "%s|%03d :: pclient is null"
			, __func__, __LINE__);
		return;
	}
	h_ota = IOT_OTA_Init(gatewayTriples.product_key.c_str()
		, gatewayTriples.device_name.c_str(), pclient);
	if (NULL == h_ota) {
		EXAMPLE_TRACE("initialize OTA failed(%s,%s)"
		, gatewayTriples.product_key.c_str()
		, gatewayTriples.device_name.c_str());
	}
#ifdef WIN32
	char version_def[128] = "iotx_win_ver_1.0.0";
#else
	char version_def[128] = "iotx_linux_ver_1.0.0";
#endif
	getVersion(version_def);
	if (0 != IOT_OTA_ReportVersion(h_ota, version_def)) {
		EXAMPLE_TRACE("report OTA version failed");
	}
	HAL_SleepMs(1000);
}

void IOToMqttAliyun::construct_iot()
{
	iotx_conn_info_pt pconn_info;
	iotx_mqtt_param_t mqtt_params;
	
	/* Device AUTH */
	if (0 != IOT_SetupConnInfo(gatewayTriples.product_key.c_str()
		, gatewayTriples.device_name.c_str()
		, gatewayTriples.device_secret.c_str()
		, (void **)&pconn_info)) 
	{
		CLogger::createInstance()->Log(MsgInfo
			, "%s|%03d :: AUTH request failed!"
			, __func__, __LINE__);
		HAL_SleepMs(10000);
		return;
	}
	/* Initialize MQTT parameter */
	memset(&mqtt_params, 0x0, sizeof(mqtt_params));

	mqtt_params.port = pconn_info->port;
	mqtt_params.host = pconn_info->host_name;
	mqtt_params.client_id = pconn_info->client_id;
	mqtt_params.username = pconn_info->username;
	mqtt_params.password = pconn_info->password;
	mqtt_params.pub_key = pconn_info->pub_key;

	mqtt_params.request_timeout_ms = 2000;
	mqtt_params.clean_session = 0;
	mqtt_params.keepalive_interval_ms = 300000;
	//
	mqtt_params.pread_buf = msg_readbuf;
	mqtt_params.read_buf_size = MQTT_MSGLEN;
	mqtt_params.pwrite_buf = msg_buf;
	mqtt_params.write_buf_size = MQTT_MSGLEN;

	mqtt_params.handle_event.h_fp = event_handle;
	mqtt_params.handle_event.pcontext = NULL;

	/* Construct a MQTT client with specify parameter */
	pclient = IOT_MQTT_Construct(&mqtt_params);
	if (NULL == pclient) {
		CLogger::createInstance()->Log(MsgInfo
			, "%s|%03d :: MQTT construct failed"
			, __func__, __LINE__);
		HAL_SleepMs(10000);
	}
}

void IOToMqttAliyun::uninit()
{
	if (NULL != producer)
	{
		delete producer;
		producer = NULL;
	}
	if (NULL != consumer)
	{
		delete consumer;
		consumer = NULL;
	}
	if (NULL != msg_buf) {
		HAL_Free(msg_buf);
	}

	if (NULL != msg_readbuf) {
		HAL_Free(msg_readbuf);
	}
	IOT_DumpMemoryStats(IOT_LOG_DEBUG);
	IOT_CloseLog();
};

void IOToMqttAliyun::create()
{
	//create MQTT client for aliyun iot 
	construct_iot();
	//void *h_ota = NULL;
	init_ota();
}

void IOToMqttAliyun::destroy()
{
	if (NULL != h_ota) {
		IOT_OTA_Deinit(h_ota);
	}
	if (NULL != pclient) {
		IOT_MQTT_Destroy(&pclient);
	}
}

void IOToMqttAliyun::subscribe()
{
	if(NULL==pclient)
	{
		CLogger::createInstance()->Log(MsgInfo, "%s|%03d :: pclient is null"
			, __func__, __LINE__);
		return;
	}
	int rc = 0;
	//如果涉及的被管理的服务太多,就不能针对每个服务态势进行消费订购,需要采用通用回到函数处置,待后期实现
	for (std::map<std::string, AliyunServiceDesc>::iterator it = subTopicMaps.begin();
		it != subTopicMaps.end(); ++it)
	{
		switch (it->second.topicType)
		{
		case TOPIC_EVENT_PRO_POST_REPLY:
			rc = IOT_MQTT_Subscribe(pclient, it->first.c_str(), IOTX_MQTT_QOS0, push_reply_message_arrive, NULL);
			break;
		case TOPIC_SERVICE_PRO_SET:
			rc = IOT_MQTT_Subscribe(pclient, it->first.c_str(), IOTX_MQTT_QOS0, service_set_message_arrive, NULL);
			break;
		default:
			continue;
		}
		if (rc < 0) {
			// printf("%s|%03d :: IOT_MQTT_Subscribe(%s) failed, rc = %d"
			// 	, __func__, __LINE__, it->first.c_str(), rc);
			CLogger::createInstance()->Log(MsgInfo
				, "%s|%03d :: IOT_MQTT_Subscribe(%s) failed, rc = %d"
				, __func__, __LINE__, it->first.c_str(), rc);
		}
		else {
			IOT_MQTT_Yield(pclient, 200);
			Print_NOTICE("IOT_MQTT_Subscribe(%s) success!\n", it->first.c_str());
		}
	}
}

void IOToMqttAliyun::unsubscribe()
{
	for (std::map<std::string, AliyunServiceDesc>::iterator it = subTopicMaps.begin();
		it != subTopicMaps.end(); ++it)
	{
		IOT_MQTT_Unsubscribe(pclient, it->first.c_str());
		IOT_MQTT_Yield(pclient, 200);
	}
}

void IOToMqttAliyun::send()
{
	WCacheAliyun it;
	if (send_queue->pop(it))
	{
		std::map<int, SvcDesc>::iterator it_svc = svcTriples.find(it.id);
		if (it_svc != svcTriples.end())
		{
			char buf_params[128] = { 0 };
			sprintf(buf_params, "{\"%s\":%d}", it_svc->second.aliyun_key.c_str(),it.val);
			iotx_mqtt_topic_info_t topic_msg;
			memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
			int msg_len = 0;
			char msg_pub[MQTT_MSGLEN] = { 0 };
			topic_msg.qos = IOTX_MQTT_QOS0;
			topic_msg.retain = 0;
			topic_msg.dup = 0;
			topic_msg.payload = (char *)msg_pub;
			topic_msg.payload_len = msg_len;
			memset(msg_pub, 0x0, MQTT_MSGLEN);
			msg_len = sprintf(msg_pub, PAYLOAD_FORMAT, cnt++, ALINK_METHOD_PROP_POST, buf_params);
			if (msg_len > 0) {
				topic_msg.payload = (char *)msg_pub;
				topic_msg.payload_len = msg_len;
				char buf[128] = { 0 };
				sprintf(buf, ALINK_TOPIC_EVENT_PRO_POST
					, gatewayTriples.product_key.c_str()
					, gatewayTriples.device_name.c_str());
				std::string alink_topic_service_pro_post = std::string(buf, strlen(buf));
				if (!alink_topic_service_pro_post.empty()) {
					int rc = IOT_MQTT_Publish(pclient, alink_topic_service_pro_post.c_str(), &topic_msg);
					if (rc < 0) {
						//EXAMPLE_TRACE("error occur when publish");
						CLogger::createInstance()->Log(MsgInfo
							, "%s|%03d :: \n publish message fail: \n topic: %s \n payload(%d): %s \n rc = %d"
							, __func__, __LINE__
							, alink_topic_service_pro_post.c_str()
							, topic_msg.payload_len, topic_msg.payload, rc);
					}
					else {
						IOT_MQTT_Yield(pclient, 200);
						EXAMPLE_TRACE("packet-id=%u, publish topic msg=%s", (uint32_t)rc, msg_pub);
					}
				}
			}
		}
	}
}

void IOToMqttAliyun::ota_down()
{
	if (IOT_OTA_IsFetching(h_ota)) 
	{
		char buf_ota[OTA_BUF_LEN];
		FILE *fp;
		if (NULL == (fp = fopen(otaInfo.update_file.c_str(), "wb+"))) {
			EXAMPLE_TRACE("open file failed");
			return;
		}
		uint32_t firmware_valid;
		uint32_t last_percent = 0, percent = 0;
		char version[128], md5sum[33];
		uint32_t len, size_downloaded, size_file;
		/* get OTA information */
		//IOT_OTA_Ioctl(h_ota, IOT_OTAG_FETCHED_SIZE, &size_downloaded, 4);
		//IOT_OTA_Ioctl(h_ota, IOT_OTAG_FILE_SIZE, &size_file, 4);
		//IOT_OTA_Ioctl(h_ota, IOT_OTAG_MD5SUM, md5sum, 33);
		//IOT_OTA_Ioctl(h_ota, IOT_OTAG_VERSION, version, 128);
		do {

			len = IOT_OTA_FetchYield(h_ota, buf_ota, OTA_BUF_LEN, 1);
			if (len > 0) {
				if (1 != fwrite(buf_ota, len, 1, fp)) {
					EXAMPLE_TRACE("write data to file failed");
					break;
				}
			}
			else {
				IOT_OTA_ReportProgress(h_ota, IOT_OTAP_FETCH_FAILED, NULL);
				EXAMPLE_TRACE("ota fetch fail");
			}

			/* get OTA information */
			IOT_OTA_Ioctl(h_ota, IOT_OTAG_FETCHED_SIZE, &size_downloaded, 4);
			IOT_OTA_Ioctl(h_ota, IOT_OTAG_FILE_SIZE, &size_file, 4);
			IOT_OTA_Ioctl(h_ota, IOT_OTAG_MD5SUM, md5sum, 33);
			IOT_OTA_Ioctl(h_ota, IOT_OTAG_VERSION, version, 128);

			percent = (size_downloaded * 100) / size_file;
			if (percent - last_percent > 1) 
			{
				last_percent = percent;
				IOT_OTA_ReportProgress(h_ota, (IOT_OTA_Progress_t)percent, NULL);//加载进度报告
				Print_NOTICE("IOT_OTA_Progress:--%d--\n", percent);
				IOT_OTA_ReportProgress(h_ota, (IOT_OTA_Progress_t)percent, "hello");
			}
			IOT_MQTT_Yield(pclient, 100);
		} while (!IOT_OTA_IsFetchFinish(h_ota));
		fclose(fp);
		IOT_OTA_Ioctl(h_ota, IOT_OTAG_CHECK_FIRMWARE, &firmware_valid, 4);
		if (0 == firmware_valid) {
			EXAMPLE_TRACE("The firmware is invalid");
		}
		else {
			EXAMPLE_TRACE("The firmware is valid");
			if (strlen(version) > 0) {
				setVersion(version);
				if (0 != IOT_OTA_ReportVersion(h_ota, version)) {
					EXAMPLE_TRACE("report OTA version failed");
				}
			}
			otaInfo.updatef = true;
		}
	}
}

int IOToMqttAliyun::getVersion(char* version_def)
{
	if (NULL == version_def)
	{
		return 0;
	}
	FILE *fp;
	if (NULL == (fp = fopen(otaInfo.version_file.c_str(), "r")))
	{
		EXAMPLE_TRACE("open file(%s) failed",otaInfo.version_file.c_str());
		size_t size = strlen(version_def);
		if (size <= 0)
		{
			return 0;
		}
		if (NULL != (fp = fopen(otaInfo.version_file.c_str(), "w")))
		{
			if (1 != fwrite(version_def, size, 1, fp))
			{
				EXAMPLE_TRACE("write data to file failed");
			}
			fclose(fp);
		}
	}
	else
	{
		char version_buf[128] = { 0 };
		fread(version_buf, 128, 1, fp);
		if (strlen(version_buf) > 0)
		{
			memcpy(version_def, version_buf, strlen(version_buf));
		}
		fclose(fp);
	}
	return static_cast<int>(strlen(version_def));
}

void IOToMqttAliyun::setVersion(char* version_def)
{
	if (NULL == version_def)
	{
		return;
	}
	size_t size = strlen(version_def);
	if (size <= 0) 
	{
		return;
	}
	FILE *fp;
	if (NULL != (fp = fopen(otaInfo.version_file.c_str(), "w")))
	{
		if (1 != fwrite(version_def, size, 1, fp))
		{
			EXAMPLE_TRACE("write data to file failed");
		}
		fclose(fp);
	}
}

void IOToMqttAliyun::update()
{
	if (!otaInfo.updatef)
		return;
	if(!pyfree::isExist(otaInfo.batfile))
	{
		//解压
		char cmd[512] = { 0 };
#ifdef WIN32
		sprintf(cmd, "%s \r\n"
			"cd %s \r\n"
			"%s e %s -o%s -y \r\n"
			"cd %s \r\n"
			, otaInfo.update_dir.substr(0, 2).c_str()
			, otaInfo.update_dir.c_str()
			, otaInfo.zip_path.c_str()
			, otaInfo.update_file.c_str()
			, otaInfo.update_dir.c_str()
			, otaInfo.appDir.c_str());
#else
		sprintf(cmd, "#!/bin/sh \n"
			"cd %s \n"
			"%s -xzvf %s -C %s \n"
			"cd %s \n"
			, otaInfo.update_dir.c_str()
			, otaInfo.zip_path.c_str()
			, otaInfo.update_file.c_str()
			, otaInfo.update_dir.c_str()
			, otaInfo.appDir.c_str());
#endif // WIN32
		Print_NOTICE("cmd:%s\n", cmd);
		if (!pyfree::writeToFile(cmd, otaInfo.batfile, "w"))
		{
			Print_WARN("write %s fail!\n", otaInfo.batfile.c_str());
		}
	}
	system(otaInfo.batfile.c_str());

	//停止 重命名 拷贝 启动
	for (std::map<int, SvcDesc>::iterator it = svcTriples.begin();
		it != svcTriples.end(); ++it) 
	{
		//stop svc
		SvcStop((char*)it->second.svc_name.c_str());
		//重命名旧app,拷贝新app
		
		char cmd[512] = { 0 };
#ifdef WIN32
		std::string svc_batfile = otaInfo.update_dir + otaInfo.pathdiv + it->second.app_name + ".bat";
		sprintf(cmd, "%s \r\n"
			"cd %s \r\n"
			"rename  %s %s_%s \r\n"
			"copy /y %s%s%s %s \r\n"
			"cd %s \r\n"
			, it->second.app_dir.substr(0, 2).c_str()
			, it->second.app_dir.c_str()
			, it->second.app_name.c_str()
			, pyfree::getCurrentTimeByFormat("%04d%02d%02dT%02d%02d%02d").c_str()
			, it->second.app_name.c_str()
			, otaInfo.update_dir.c_str()
			, otaInfo.pathdiv.c_str()
			, it->second.app_name.c_str()
			, it->second.app_dir.c_str()
			, otaInfo.appDir.c_str());
#else
		std::string svc_batfile = otaInfo.update_dir + otaInfo.pathdiv + it->second.app_name + ".sh";
		sprintf(cmd, "#!/bin/sh \n"
			"cd %s \n"
			"rename  %s %s_%s \n"
			"cp -f %s%s%s %s \n"
			"cd %s \n"
			, it->second.app_dir.c_str()
			, it->second.app_name.c_str()
			, pyfree::getCurrentTimeByFormat("%04d%02d%02dT%02d%02d%02d").c_str()
			, it->second.app_name.c_str()
			, otaInfo.update_dir.c_str()
			, otaInfo.pathdiv.c_str()
			, it->second.app_name.c_str()
			, it->second.app_dir.c_str()
			, otaInfo.appDir.c_str());
#endif // WIN32
		Print_NOTICE("cmd:%s\n", cmd);
		if (!pyfree::writeToFile(cmd, svc_batfile, "w"))
		{
			Print_WARN("write %s fail!\n", svc_batfile.c_str());
		}
		system(svc_batfile.c_str());
		//start
		SvcStart((char*)it->second.svc_name.c_str());
	}
	otaInfo.updatef = false;
}

void* IOToMqttAliyun::run()
{
	//test
	//otaInfo.updatef = true;
	//update();
	while (running)
	{
		if (NULL == pclient)
		{
			create();//create pclient;
		} else {
			if (IOT_MQTT_CheckStateNormal(pclient))
			{
				if (!initLink)
				{
					subscribe();
					initLink = true;
				}
				send();
				/* handle the MQTT packet received from TCP or SSL connection */
				IOT_MQTT_Yield(pclient, 200);
				ota_down();
				update();
			}
			else {
				if (initLink)
				{
					initLink = false;
				}
				/* handle the MQTT packet received from TCP or SSL connection */
				IOT_MQTT_Yield(pclient, 1000);
			}
		}
		HAL_SleepMs(100);
	}
	if (IOT_MQTT_CheckStateNormal(pclient))
	{
		unsubscribe();
	}
	destroy();
	return 0;
}
